package org.budo.warehouse.logic.consumer;

import java.util.ArrayList;
import java.util.List;

import org.budo.warehouse.logic.api.DataConsumer;
import org.budo.warehouse.logic.api.DataEntry;
import org.budo.warehouse.logic.api.DataMessage;
import org.budo.warehouse.logic.producer.DataMessageImpl;
import org.budo.warehouse.service.entity.Pipeline;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
 * @author limingwei
 */
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class DataEntryPojoConsumer implements DataConsumer {
    private DataConsumer dataConsumer;

    private Pipeline pipeline;

    @Override
    public void consume(DataMessage dataMessage) {
        List<DataEntry> dataEntries = dataMessage.getDataEntries();
        List<DataEntry> dataEntryPojos = new ArrayList<DataEntry>();

        for (DataEntry dataEntry : dataEntries) {
            DataEntryPojo dataEntryPojo = new DataEntryPojo(dataEntry) // 序列化传输
                    .setTableName(pipeline.getTargetTable()) // 供下一步匹配
                    .setSchemaName(pipeline.getTargetSchema());

            dataEntryPojos.add(dataEntryPojo);
        }

        DataMessageImpl dataMessagePojo = new DataMessageImpl(pipeline.getTargetDataNodeId(), dataEntryPojos);
        dataConsumer.consume(dataMessagePojo);
    }
}